package com.xdl.apitest.tabletest.udftest

import com.xdl.apitest.SensorReading
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{Table, Tumble}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row

/**
 * Project: FlinkTutorial
 * Package: com.xdl.apitest
 * Version: 1.0
 *
 * Created by guoxiaolong on 2020-08-01-22:21
 */
object ScalarFunctionTest {
  def main(args: Array[String]): Unit = {
    //0.创建流执行环境，读取数据并转换成样例类
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 1.创建表执行环境
    val tableEnv = StreamTableEnvironment.create(env)

    // 2. 读取数据转换成流， map 成样例类
    val filePath: String = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt"
    val inputStream: DataStream[String] = env.readTextFile(filePath)

    //map 成样例类型
    val dataStream: DataStream[SensorReading] = inputStream
      .map(data => {
        val dataArray = data.split(",")
        SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
        override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
      })

    //将流转换成表，直接定义时间字段
    val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp, 'pt.proctime)

    //使用自定义的 hash 函数, 求 id 的哈希值
    val hashCode = new HashCode(1.23)

    //Table API 调用方式
    val resultTable: Table = sensorTable
      .select('id, 'ts, hashCode('id))

    // SQL 调用方式，首先要注册表和函数
    tableEnv.createTemporaryView("sensor", sensorTable)
    tableEnv.registerFunction("hashCode", hashCode)
    val resultSqlTable: Table = tableEnv.sqlQuery(
      """
        |select id, hashCode(id)
        |from sensor
        |""".stripMargin)

    // 转换成流打印输出
    resultTable.toAppendStream[Row].print("result")
    resultSqlTable.toAppendStream[Row].print("sql")

    env.execute("Scalar UDF test")


  }

  //自定义一个求 hash code 的标量函数
  class HashCode(factor: Double) extends ScalarFunction {
    def eval(value: String): Int = {
      (value.hashCode * factor.toInt)

    }
  }

}
